package com.xx.websocket.server;

import com.xx.dk.tbsalling.aismessages.ais.messages.handler.DynamicDataReportHandler;
import com.xx.util.ProtobufBatchSender;
import com.xx.web.proto.ShipOuterClass;
import com.xx.websocket.config.JsonEncoder;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.springframework.stereotype.Component;

import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicInteger;


/**
 * @author fmy
 * @version 1.0
 * @ClassName: WebSocketServer
 * @Description:
 * @Date: 2023/4/11 14:14
 * @since JDK 1.8
 */
@Slf4j
@Component
@ServerEndpoint(value = "/wb/track/{key}", encoders = JsonEncoder.class)
public class TrackPushServer {

    private static AtomicInteger onlineCount = new AtomicInteger(0);

    public static CopyOnWriteArraySet<TrackPushServer> webSocketSet = new CopyOnWriteArraySet<>();
    private static Map<String, Session> sessionPool = new HashMap<String, Session>();

    //private static final ReentrantLock lock = new ReentrantLock();
    private Session session;

    @OnOpen
    public void onOpen(Session session, @PathParam("key") String key) {
        this.session = session;
        sessionPool.put(key, session);
        //加入set
        webSocketSet.add(this);
        addOnlineCount();
        log.info("有新连接加入！当前在线人数为" + getOnlineCount() + " session: " + session.getId());

        try {
            synchronized (webSocketSet) {
                synchronized (DynamicDataReportHandler.queue.lock) {
                    Collection<ShipOuterClass.Ship> values = DynamicDataReportHandler.queue.values();
                    if (CollectionUtils.isNotEmpty(values)) {
                        log.info("推送:{}条",values.size());
                        session.getBasicRemote().sendBinary(ProtobufBatchSender.sendProtobufBatch(values.stream().toList()));
                    }
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    @OnClose
    public void onClose(Session session, @PathParam("key") String key) {
        webSocketSet.remove(this);
        subOnlineCount();
        log.info("有一连接关闭！当前在线人数为" + getOnlineCount());
    }

    public void optClose(Session session) {
        if (session.isOpen()) {
            try {
                CloseReason closeReason = new CloseReason(CloseReason.CloseCodes.NORMAL_CLOSURE, "鉴权失败!");
                session.close(closeReason);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * 收到客户端消息后调用的方法
     * {"sendType":"message","data":""} message 表示发送的是消息 heart表示心跳 则data必须是ping
     *
     * @param message 客户端发送过来的消息
     */
    @OnMessage
    public void onMessage(String message, Session session, @PathParam("key") String key) {
        synchronized (webSocketSet) {
            if ("ping".equals(message)) {
                sendSyncMessage("pong");
            }
        }

    }

    /**
     * @param session
     * @param error
     */
    @OnError
    public void onError(Session session, Throwable error) {
        log.error("发生错误");
        error.printStackTrace();
    }

    /**
     * 群发自定义消息
     */
    public static void sendInfo(String message) {
        log.info(message);
        for (TrackPushServer item : webSocketSet) {
            try {
                item.sendASyncMessage(message);
            } catch (Exception e) {
                log.error("服务端给客户端群发消息失败: ", e);
            }
        }
    }

    /**
     * 发送信息
     *
     * @param message
     */
    private void sendSyncMessage(String message) {
        try {
            this.session.getBasicRemote().sendText(message);
        } catch (Exception e) {
            log.error("服务端给客户端发送消息失败: ", e);
        }
    }

    public void sendASyncMessage(byte[] message) {
        try {
            this.session.getAsyncRemote().sendBinary(ByteBuffer.wrap(message));
        } catch (Exception e) {
            log.error("服务端给客户端发送消息失败: ", e);
        }
    }

    /**
     * 发送信息
     *
     * @param message
     */
    private void sendASyncMessage(String message) {
        try {
            this.session.getAsyncRemote().sendText(message);
        } catch (Exception e) {
            log.error("服务端给客户端发送消息失败: ", e);
        }
    }

    // 此为广播消息
    public static void sendAllMessage(String message) {
        for (TrackPushServer webSocket : webSocketSet) {
            try {
                webSocket.session.getAsyncRemote().sendText(message);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    public static void sendAllMessage(ByteBuffer byteBuffer) {
        synchronized (webSocketSet) {
            for (TrackPushServer webSocket : webSocketSet) {
                try {
                    webSocket.session.getBasicRemote().sendBinary(byteBuffer);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }

    // 此为广播消息
    public static void sendAllObjMessage(Object message) {
        System.out.println(webSocketSet.size());
        for (TrackPushServer webSocket : webSocketSet) {
            try {
                webSocket.session.getAsyncRemote().sendObject(message);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    // 此为单点消息 (发送对象)
    public void sendObjMessage(Object message) {
        if (this.session != null) {
            try {
                this.session.getAsyncRemote().sendObject(message);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    public AtomicInteger getOnlineCount() {
        return onlineCount;
    }

    private void addOnlineCount() {
        onlineCount.incrementAndGet();
    }

    public void subOnlineCount() {
        onlineCount.decrementAndGet();
    }
}
